---
id: Features_CLI_streaming-support
title: Streaming Support in FBOSS2 CLI
---

# Streaming Support in FBOSS2 CLI

## Description

This article introduces how to add a new streaming command. It will also be helpful for people interested in how streaming support is implemented for FBOSS2 CLI.

## Add New Streaming Commands

Please refer to [D37011503](https://www.internalfb.com/diff/D37011503) for a complete implementation of subscribing to an fsdb service.

Here is a step-by-step breakdown:

1. Subclass `BaseCommandTraits`, define the type of data being read in `RetType` and `StreamRetType`. (`StreamRetType` is usually defined as `folly::coro::AsyncGenerator<RetType&&, RetType>`)

2. Subclass `CmdStreamHandler`, override `queryClient` and `printOutput`

3. Register it in `CmdList.cpp` with name and description

4. Add to build target: `fbcode/fboss/cli/fboss2/TARGETS`


## Background Knowledge

The below sections only provide basic background information specific to streaming support. General documentation related to coroutine, usage of `folly::coro`, and thrift can be found in the attached links.

### Implementing `coroutine` with `folly::coro`

Please refer to these two links for more information:

* https://www.internalfb.com/intern/wiki/Coro/

* https://github.com/facebook/folly/tree/main/folly/coro


1. Write a simple coroutine function
Any function that returns a `folly::coro::Task` and has at least one use of `co_await` or `co_return` is a coroutine.
```
#include <folly/coro/Task.h>

folly::coro::Task<int> task42() {
  co_return 42;
}

folly::coro::Task<int> task43() {
  auto value = co_await task42();
  co_return value + 1;
}
```
2. Start a coroutine in a coroutine function using `co_await`
```
#include <folly/coro/Task.h>

int value = 0;

folly::coro::Task<void> asyncSetValue(int newValue) {
  value = newValue;
  co_return;
}

folly::coro::Task<void>  example() {
  auto task = asyncSetValue(42);

  // Coroutine has not yet started executing
  assert(value == 0);

  // It only starts executing when the Task is awaited.
  co_await std::move(task);

  assert(value == 42);
}
```
3. Start a coroutine in a normal function using `folly::coro::blockingWait()`
```
#include <folly/coro/BlockingWait.h>
#include <folly/coro/Task.h>

folly::coro::Task<int> doSomething() {
  co_await folly::coro::sleep(1s);
  co_return 42;
}

void someNonCoroutineFunction() {
  int result = folly::coro::blockingWait(doSomething()));
}
```

### Thrift: Using a client

Please refer to the two links for more information: - https://www.internalfb.com/intern/wiki/Thrift/Thrift_Guide/Thrift_Quick_Start/ - https://www.internalfb.com/intern/wiki/Thrift/Thrift_Guide/UsingAClient/

A few Thrift services involved in the fboss2 CLIs are listed below

* `FbossCtrl` -> `FbossCtrlAsyncClient`: https://www.internalfb.com/code/fbsource/fbcode/fboss/agent/if/ctrl.thrift

* `QsfpService` -> `QsfpServiceAsyncClient`: https://www.internalfb.com/code/fbsource/[bda48995ab34187ec20322810bf609b5cc6c2f25]/fbcode/fboss/qsfp_service/if/qsfp.thrift?lines=8

* `FsdbService` -> `FsdbServiceAsyncClient`: https://www.internalfb.com/code/fbsource/fbcode/fboss/fsdb/if/facebook/fsdb.thrift


Every single command in FBOSS2 CLI functions as a client that subscribes to different Thrift services. In order to retrieve information from a server, we need the following steps (replace Movie with the service name that you want to use):

1. create a ServiceRouter client.
```
#include "servicerouter/client/cpp2/ServiceRouter.h"

auto params = facebook::servicerouter::ClientParams();
auto client = facebook::servicerouter::cpp2::getClientFactory()
                    .getSRClientUnique<MovieServiceAsyncClient>("<your-smc-tier>", params);
```
2. create a request
```
MovieInfoRequest request;
request.title_ref() = "Monty Python";
// COROUTINE USAGE
try {
  MovieInfo r = co_await client->co_getMovieInfo(request);
  LOG(INFO) << "Coroutine response : " << *r.synopsis_ref();
} catch (const std::exception& e) {
  ...
}
// SYNCHRONOUS USAGE
try {
  MovieInfo info;
  client->sync_getMovieInfo(info, request);
  LOG(INFO) << "ReplySync: " << info.get_title();
} catch (MovieNotFoundException ex) {
  LOG(INFO) << "Exception response from sync!";
}
```

### Thrift Streaming

Please refer below for more information:

* https://www.internalfb.com/intern/wiki/Thrift/Thrift_Guide/Streaming/


> A stream is a communication abstraction between a client and a server, one side serving as the producer and the other side serving as the consumer. Streams allow the flow of ordered messages from the producer to the consumer. Each stream has a type, which means that all messages in the stream are objects of this type.


For streaming support, you will mainly work with Stream Publisher like `FsdbService`.

After creating the `FsdbServiceAsyncClient`, we can subscribe `FsdbService` and output results by following the given steps:

1. Send a request to the streaming services and retrieve a generator in a non-streaming comma
A major difference between a single-request single-response RPC and single-request stream-response RPC is the return type on the client-side.
`client->co_subscribeOperStatePath(req)` returns a `ResponseAndClientBufferredStream` instance.
The preferred way to consume the client stream is by converting it to an `AsyncGenerator`.
```
auto gen =
    folly::coro::blockingWait(client->co_subscribeOperStatePath(req));

return std::move(gen).stream.toAsyncGenerator();
```
2. Output data from the stream generator using another coroutine
```
while (auto val = co_await data.next()) {
  cmd.printOutput(*val, out);
}
```

## Implementation of Streaming Support

### Stream generator and how to read from a stream

[D36808860](https://www.internalfb.com/diff/D36808860) [fboss2:streaming support] Add `printJsonStream` with coroutine supported

The above diff implements a coroutine generator through `folly::coro` and shows how to consume data from a generator.

`printJsonStream` can help understand how to work with streaming.

### Streaming command handler

[D36866722](https://www.internalfb.com/diff/D36866722) [fboss: add streaming support] Implement `CmdStreamHandler` for streaming command and a dummy command using AsyncPipe to validate the behavior

The subclassed `CmdStreamHandler` from `CmdHandler` overrides the `run` command.

The original `run` method in `CmdHandler` will call `queryClient` first, and receive direct results from the client service. Then, it outputs the received data by calling `printOutput`.

The `queryClient` method in subclassed `CmdStreamHandler` returns a generator (streaming of data) instead. The `run` method in the `CmdStreamHandler` block waits for a coroutine that keeps reading data from the stream generator and then outputs it.

Here is a simplified implementation for understanding:

```
template <typename CmdTypeT>
folly::coro::Task<void> printJsonStream(
    CmdTypeT& cmd,
    typename CmdTypeT::StreamRetType& data,
    std::ostream& out,
    std::ostream& err) {
    while (auto val = co_await data.next()) {
      cmd.printOutput(*val);
    }
  co_return;
}

void CmdStreamHandler<CmdTypeT, CmdTypeTraits>::run() {
  auto host = getHosts()[0];

  auto hostInfo = HostInfo(host);

  StreamRetType&& result = impl().queryClient(hostInfo);
  folly::coro::blockingWait(
      printJsonStream(impl(), result, std::cout, std::cerr));

}
```
### Understand a dummy streaming command

Please refer to the below diff for a full example.

[D36866722](https://www.internalfb.com/diff/D36866722) [fboss: add streaming support] Implement `CmdStreamHandler` for streaming command and a dummy command using AsyncPipe to validate the behavior

In this example, `queryClient` returns a generator created from `AsyncPipe`. While implementing an actual streaming command, you will replace the generator with a real streaming service.

Please notice that `StreamRetType` is derived from `folly::coro::AsyncGenerator<RetType&&, RetType>`, where `RetType` is the desired type of unit data.

In the non-streaming command, the returned type of `queryClient` and the type of input parameter `model` in `printOutput` is the same.

We can reinterpret the data flow in non-streaming command as

```
printOutput(queryClient(hostInfo))
```
Whereas the data flow in the streaming command is

```
auto generator = queryClient(hostInfo);
while (auto val = co_await generator.next()) {
  printOutput(*val);
}
```
## A few notes

* Intro and quick start of thrift https://www.internalfb.com/intern/wiki/Thrift/Thrift_Guide/Thrift_Quick_Start/

* The thrift streaming guide (https://www.internalfb.com/intern/wiki/Thrift/Thrift_Guide/Streaming/) also provides suggestions on how to do mocking in unit tests.

* Please check the comments here (https://github.com/facebook/folly/blob/main/folly/coro/Generator.h).
It helps better understand the reference type and value type in folly::coro generator.

